package fun.easycode.datastream;

import cn.hutool.core.util.IdUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import fun.easycode.datastream.util.MyBatisPlusUtil;
import fun.easycode.jointblock.core.CheckException;
import fun.easycode.jointblock.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.sql.Wrapper;
import java.util.Collections;

/**
 * 每一个Transform对应一个sink，会以监听的方式注册到canal中
 * @author xuzhen97
 * @param <T>
 * @param <R>
 */
@Slf4j
public class DataTransformerEntryMQSink<T, R> implements CanalListener<T>{
    private final IDataTransformer<T,R> transform;
    private final DataStreamProperties properties;
    private final DefaultMQProducer producer;

    public DataTransformerEntryMQSink(IDataTransformer<T,R> transform, DataStreamProperties properties) {
        this.transform = transform;
        this.properties = properties;
        producer = createProducer();
    }

    /**
     * 启动生产者
     * @throws MQClientException 启动异常
     */
    public void start() throws MQClientException {
        producer.start();
    }

    public void sink(DataEntry<T> dataEntry){
        String transformWhereSql;

        Wrapper wrapper = transform.getWrapper(Collections.singletonList(dataEntry.getData()));

        if(wrapper instanceof QueryWrapper){
            transformWhereSql = MyBatisPlusUtil.getCustomSqlSegment((QueryWrapper) wrapper);
        }else if(wrapper instanceof LambdaQueryWrapper){
            transformWhereSql = MyBatisPlusUtil.getCustomSqlSegment((LambdaQueryWrapper) wrapper);
        }else{
            log.error("不支持的wrapper类型:{}", wrapper.getClass().getName());
            throw new CheckException("不支持的wrapper类型");
        }

        dataEntry.putExtra("transformWhereSql", transformWhereSql);

        // 最终会被输出的报表，这里消息要以具体的报表为维度进行一些处理
        // 比如如果队列充足的情况下，一个报表一个队列，如果队列不足，都会堆积在0的队列中
        String outputTableName = transform.getParam().getOutputTableName();
        // TODO key 业务上的id, 后续可以和日志绑定方便排查问题
        // 也可以进一步抽象但是增加工作量
        String key = IdUtil.getSnowflakeNextIdStr();
        // 消息体
        Message message = new Message(properties.getRocketMq().getDefaultTransformTopic()
                , transform.getMark(), key, JacksonUtil.toJson(dataEntry).getBytes());
        try {
            // 发送顺序消息
            producer.send(message, new MessageQueueSelectorImpl(), outputTableName);
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建生产者
     * @return 生产者
     */
    private DefaultMQProducer createProducer(){
        // 一个transform对应一个生产者, 所以生产者的名称就是transform的名称
        DefaultMQProducer producer = new DefaultMQProducer(transform.getMark());
        producer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        return producer;
    }

    @Override
    public void onMessage(DataEntry<T> entry) {
        sink(entry);
    }

    @Override
    public String getTableName() {
        return DataContext.getTableName(transform);
    }

    @Override
    public Class<T> getTargetClass() {
        return transform.getParam().getInputClass();
    }

    @Override
    public String getMark() {
        return transform.getMark();
    }
}
